/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#define USING_LOG_PREFIX SERVER

#include "lib/objectpool/ob_resource_pool.h"
#include "ob_inner_sql_connection_pool.h"
#include "sql/session/ob_sql_session_mgr.h"
#include "sql/ob_sql.h"

namespace oceanbase {
using namespace common;
using namespace share;
using namespace share::schema;
using namespace sql;

namespace observer {
ObInnerSQLConnectionPool::ObInnerSQLConnectionPool()
    : inited_(false),
      stop_(false),
      total_conn_cnt_(0),
      free_conn_list_(),
      used_conn_list_(),
      allocator_(ObModIds::OB_INNER_SQL_CONN_POOL),
      schema_service_(NULL),
      partition_table_operator_(NULL),
      ob_sql_(NULL),
      vt_iter_creator_(NULL),
      config_(NULL)
{}

ObInnerSQLConnectionPool::~ObInnerSQLConnectionPool()
{
  int ret = OB_SUCCESS;
  if (inited_) {
    ObThreadCondGuard guard(cond_);
    if (free_conn_list_.get_size() != total_conn_cnt_) {
      LOG_ERROR("not all connection been freed", K_(total_conn_cnt), "free_conn_cnt", free_conn_list_.get_size());
    }

    while (!free_conn_list_.is_empty()) {
      LinkNode* node = free_conn_list_.remove_first();
      if (NULL == node) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("NULL connection", K(ret));
      } else {
        node->~LinkNode();
        allocator_.free(node);
        node = NULL;
      }
    }
  }
}

int ObInnerSQLConnectionPool::init(ObMultiVersionSchemaService* schema_service, ObSql* ob_sql,
    ObVTIterCreator* vt_iter_creator, const ObPartitionTableOperator* partition_table_operator,
    common::ObServerConfig* config)
{
  int ret = OB_SUCCESS;
  if (inited_) {
    ret = OB_INIT_TWICE;
    LOG_WARN("init twice", K(ret));
  } else if (NULL == schema_service || NULL == ob_sql || NULL == vt_iter_creator || NULL == partition_table_operator) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN(
        "invalid argument", K(ret), KP(schema_service), KP(ob_sql), KP(vt_iter_creator), KP(partition_table_operator));
  } else if (OB_FAIL(cond_.init(ObWaitEventIds::INNER_CONNECTION_POOL_COND_WAIT))) {
    LOG_WARN("fail to init cond, ", K(ret));
  } else {
    schema_service_ = schema_service;
    ob_sql_ = ob_sql;
    vt_iter_creator_ = vt_iter_creator;
    partition_table_operator_ = partition_table_operator;
    config_ = config;
    inited_ = true;
  }
  return ret;
}

int ObInnerSQLConnectionPool::acquire(
    common::sqlclient::ObISQLConnection*& conn, const int64_t cluster_id, ObISQLClient* client_addr)
{
  UNUSED(cluster_id);
  return acquire(conn, client_addr);
}

int ObInnerSQLConnectionPool::acquire(common::sqlclient::ObISQLConnection*& conn, ObISQLClient* client_addr)
{
  int ret = OB_SUCCESS;
  ObInnerSQLConnection* inner_sql_conn = NULL;
  if (!inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not inited", K(ret));
  } else if (OB_FAIL(alloc_conn(inner_sql_conn))) {
    LOG_WARN("alloc connection from pool failed", K(ret));
  } else if (OB_FAIL(inner_sql_conn->init(this,
                 schema_service_,
                 ob_sql_,
                 vt_iter_creator_,
                 partition_table_operator_,
                 config_,
                 NULL /* session_info */,
                 client_addr))) {
    LOG_WARN("init connection failed", K(ret));
  } else {
    inner_sql_conn->ref();
    conn = inner_sql_conn;
  }

  if (OB_FAIL(ret)) {
    if (NULL != inner_sql_conn) {
      int tmp_ret = inner_sql_conn->destroy();
      if (OB_SUCCESS != tmp_ret) {
        LOG_WARN("destroy connection failed", "ret", tmp_ret);
      }
      // continue executing while destroy error.
      tmp_ret = free_conn(inner_sql_conn);
      if (OB_SUCCESS != tmp_ret) {
        LOG_WARN("free connection failed", "ret", tmp_ret);
      }
    }
  }

  return ret;
}

//@notice: performance optimization
// spi inner sql connection will be called frequently by PL(test in TPCC PL)
// before this, spi inner sql connection was management by inner sql connection pool
// when acquire inner sql connection, need wrlock to protect concurrency problem
// this action has serious performance problems
// cache SPI inner sql connection to ObServerObjectPool
// ObServerObjectPool has independent allocator on each core
// so it can reduce the conflict of threads acquiring spi connection
int ObInnerSQLConnectionPool::acquire_spi_conn(sql::ObSQLSessionInfo* session_info, ObInnerSQLConnection*& conn)
{
  int ret = OB_SUCCESS;
  if (OB_ISNULL(conn = rp_alloc(ObInnerSQLConnection, ObInnerSQLConnection::LABEL))) {
    ret = OB_ALLOCATE_MEMORY_FAILED;
    LOG_WARN("allocate spi connection failed", K(ret));
  } else if (OB_FAIL(conn->init(
                 this, schema_service_, ob_sql_, vt_iter_creator_, partition_table_operator_, config_, session_info))) {
    LOG_WARN("init connection failed", K(ret));
  } else {
    conn->ref();
    conn->set_spi_connection(true);
  }
  return ret;
}

int ObInnerSQLConnectionPool::acquire(sql::ObSQLSessionInfo* session_info, common::sqlclient::ObISQLConnection*& conn)
{
  int ret = OB_SUCCESS;
  ObInnerSQLConnection* inner_sql_conn = NULL;
  if (!inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not inited", K(ret));
  } else if (OB_FAIL(alloc_conn(inner_sql_conn))) {
    LOG_WARN("alloc connection from pool failed", K(ret));
  } else if (OB_FAIL(inner_sql_conn->init(
                 this, schema_service_, ob_sql_, vt_iter_creator_, partition_table_operator_, config_, session_info))) {
    LOG_WARN("init connection failed", K(ret));
  } else {
    if (0 != inner_sql_conn->get_ref()) {
      LOG_WARN("ref is not ZERO after acquire", KP(inner_sql_conn), "ref_cnt", inner_sql_conn->get_ref(), K(lbt()));
    }
    inner_sql_conn->ref();
    conn = inner_sql_conn;
  }

  if (OB_FAIL(ret)) {
    if (NULL != inner_sql_conn) {
      int tmp_ret = inner_sql_conn->destroy();
      if (OB_SUCCESS != tmp_ret) {
        LOG_WARN("destroy connection failed", "ret", tmp_ret);
      }
      // continue executing while destroy error.
      tmp_ret = free_conn(inner_sql_conn);
      if (OB_SUCCESS != tmp_ret) {
        LOG_WARN("free connection failed", "ret", tmp_ret);
      }
    }
  }
  return ret;
}

int ObInnerSQLConnectionPool::release(common::sqlclient::ObISQLConnection* conn, const bool success)
{
  // alway try to destroy connection, ignore success flag.
  UNUSED(success);
  int ret = OB_SUCCESS;
  if (!inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not inited", K(ret));
  } else if (NULL == conn) {
    // ignore NULL connection release
  } else {
    static_cast<ObInnerSQLConnection*>(conn)->unref();
  }
  return ret;
}

int ObInnerSQLConnectionPool::revert(ObInnerSQLConnection* conn)
{
  int ret = OB_SUCCESS;
  if (!inited_) {
    LOG_WARN("not init", K(ret));
  } else if (NULL == conn) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret));
  } else {
    if (conn->is_spi_conn()) {
      // spi connection come from ObServerObjectPool, so release it to ObServerObjectPool
      rp_free(conn, ObInnerSQLConnection::LABEL);
    } else if (OB_FAIL(conn->destroy())) {
      LOG_WARN("connection destroy failed", K(ret));
    } else if (OB_FAIL(free_conn(conn))) {
      LOG_WARN("free connection failed", K(ret));
    }
  }
  return ret;
}

// TODO : implement
int ObInnerSQLConnectionPool::escape(
    const char* from, const int64_t from_size, char* to, const int64_t to_size, int64_t& out_size)
{
  int ret = OB_SUCCESS;
  if (!inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not init", K(ret));
  } else {
    if (NULL != from && from_size > 0) {
      if (to_size < from_size * 2) {
        ret = OB_BUF_NOT_ENOUGH;
        LOG_WARN("string buffer not enough", K(ret), K(from_size), K(to_size));
      } else if (NULL == to) {
        ret = OB_INVALID_ARGUMENT;
        LOG_WARN("to buffer is NULL", K(ret), KP(to), KP(from), K(from_size));
      } else {
        MEMCPY(to, from, from_size);
        out_size = from_size;
      }
    } else {
      out_size = 0;
    }
  }
  return ret;
}

int ObInnerSQLConnectionPool::alloc_conn(ObInnerSQLConnection*& conn)
{
  int ret = OB_SUCCESS;
  ObInnerSQLConnection* inner_sql_conn = NULL;
  if (!inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not init", K(ret));
  } else if (stop_) {
    ret = OB_SERVER_IS_STOPPING;
    LOG_WARN("connection pool stoped", K(ret));
  } else {
    ObThreadCondGuard guard(cond_);
    if (free_conn_list_.is_empty()) {
      void* mem = allocator_.alloc(sizeof(*conn));
      if (NULL == mem) {
        ret = OB_ALLOCATE_MEMORY_FAILED;
        LOG_ERROR("alloc memory failed", K(ret), K_(total_conn_cnt));
      } else {
        total_conn_cnt_++;
        inner_sql_conn = new (mem) ObInnerSQLConnection();
      }
    } else {
      LinkNode* node = free_conn_list_.remove_first();
      node->~LinkNode();
      inner_sql_conn = new (node) ObInnerSQLConnection();
    }
    if (OB_FAIL(ret)) {
    } else if (OB_UNLIKELY(false == used_conn_list_.add_last(inner_sql_conn))) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("add connection to used connection list failed", K(ret));
    } else {
      conn = inner_sql_conn;
    }
  }

  if (total_conn_cnt_ >= WARNNING_CONNECTION_CNT && OB_LOG_NEED_TO_PRINT(WARN)) {
    ObThreadCondGuard guard(cond_);
    LOG_WARN("allocated too many connections, may be connection leak",
        K(ret),
        K_(total_conn_cnt),
        "free_conn_size",
        free_conn_list_.get_size());
    dump_used_conn_list();
  }

  return ret;
}

int ObInnerSQLConnectionPool::free_conn(ObInnerSQLConnection* conn)
{
  int ret = OB_SUCCESS;
  if (!inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not init", K(ret));
  } else if (NULL == conn) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), KP(conn));
  } else {
    ObThreadCondGuard guard(cond_);
    if (OB_UNLIKELY(NULL == used_conn_list_.remove(conn))) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("remove connection from used connection list failed", K(ret));
    } else {
      conn->~ObInnerSQLConnection();
      LinkNode* node = new (conn) LinkNode();
      if (!free_conn_list_.add_last(node)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("add connection to free connection list failed", K(ret));
      } else {
        if (stop_ && 0 == used_conn_list_.get_size()) {
          cond_.signal();
        }
      }
    }
  }

  return ret;
}

int ObInnerSQLConnectionPool::wait()
{
  int ret = OB_SUCCESS;
  const int64_t WAIT_TIME_MS = 1000;
  const int64_t WARNNING_TIME_US = 5 * 1000 * 1000;
  const int64_t begin = ObTimeUtility::current_time();
  if (!inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not init", K(ret));
  } else if (!stop_) {
    ret = OB_INNER_STAT_ERROR;
    LOG_WARN("not stoped", K(ret));
  } else {
    ObThreadCondGuard guard(cond_);
    while (used_conn_list_.get_size() > 0) {
      const int64_t now = ObTimeUtility::current_time();
      if (now - begin > WARNNING_TIME_US) {
        LOG_WARN("too much time used to wait connection release, may be connection leak",
            "used_time_ms",
            now - begin,
            "used connection count",
            used_conn_list_.get_size());
        dump_used_conn_list();
      }
      cond_.wait(WAIT_TIME_MS);
    }
  }
  return ret;
}

void ObInnerSQLConnectionPool::dump_used_conn_list()
{
  int64_t dump_size = MIN(used_conn_list_.get_size(), MAX_DUMP_SIZE);
  LOG_WARN("====== DUMP USED CONNECTIONS' BACKTRACE INFO ====== ", K(dump_size));
  DLIST_FOREACH_X(conn, used_conn_list_, (--dump_size >= 0))
  {
    if (OB_ISNULL(conn)) {
      LOG_WARN("node is null");
    } else {
      conn->dump_conn_bt_info();
    }
  }
}

int ObInnerSQLConnectionPool::on_client_inactive(ObISQLClient* client_addr)
{
  int ret = OB_SUCCESS;
  int64_t cnt = 0;
  // do not check ret in loop, continue on error happen.
  ObThreadCondGuard guard(cond_);
  DLIST_FOREACH_NORET(conn, used_conn_list_)
  {
    if (conn->get_associated_client() == client_addr) {
      if (NULL == ob_sql_ || NULL == ob_sql_->get_partition_service()) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("NULL partition service", K(ret));
      } else {
        int tmp_ret = sql::ObSQLSessionMgr::kill_query(conn->get_session(), ob_sql_->get_partition_service());
        if (OB_SUCCESS != tmp_ret) {
          LOG_WARN("kill query failed", K(tmp_ret));
          ret = OB_SUCCESS == ret ? tmp_ret : ret;
        }
        cnt++;
      }
    }
  }
  if (cnt > 0) {
    LOG_INFO("kill inner query", K(ret), K(cnt));
  }
  return ret;
}

}  // end namespace observer
}  // end namespace oceanbase
